package com.huan.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 读取本地文件，并写入到数据库中
 *
 * @author huan.fu
 * @date 2023/7/16 - 11:08
 */
public class WriteDbDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 构建配置对象
        Configuration configuration = new Configuration();
        // 配置当前作业需要的JDBC密码
        DBConfiguration.configureDB(
                configuration,
                "com.mysql.cj.jdbc.Driver",
                "jdbc:mysql://localhost:3306/temp_work",
                "root",
                "root@1993"
        );
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new WriteDbDriver(), args);
        // 退出程序
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 构建Job对象实例 参数（配置对象，Job对象名称）
        Job job = Job.getInstance(getConf(), WriteDbDriver.class.getName());
        // 设置mr程序运行的主类
        job.setJarByClass(WriteDbDriver.class);
        // 设置mr程序运行的 mapper类型和reduce类型
        job.setMapperClass(ReadFileMapper.class);
        job.setReducerClass(WriteDbReducer.class);
        // 指定mapper阶段输出的kv数据类型
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Student.class);
        // 指定reduce阶段输出的kv数据类型，业务mr程序输出的最终类型
        job.setOutputKeyClass(Student.class);
        job.setOutputValueClass(NullWritable.class);
        // 配置本例子中的输入数据路径和输出数据路径，默认输入输出组件为： TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path("/tmp/mr-read-from-db/part-m-00000"));

        // 设置输出格式
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, "student", "student_id", "student_name");

        return job.waitForCompletion(true) ? 0 : 1;
    }
}
